use std::time::Duration;

use crate::handlers::{log_error, render, HtmlView, Result};
use crate::stores::redisx;
use crate::stores::sql::{self, MpBook};
use crate::{misc::utils, SharedState, UserSession, WsMsg, WsUserInfo, SESSION_KEY};
use askama::Template;
use axum::extract::ws::{Message, WebSocket};
use axum::extract::{Path, Query, State, WebSocketUpgrade};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::Form;
use axum_csrf::CsrfToken;
use axum_session::{Session, SessionRedisPool};
use futures::channel::mpsc;
use futures::{sink::SinkExt, stream::StreamExt};
use headers::HeaderMap;
use serde::{Deserialize, Serialize};

use super::EcodeParams;

const BOOK_MESSAGE_HASH: &str = "book_chat_hash"; // 存放用户昵称，和时间，计划任务根据这个进行筛选删除。
const BOOK_MESSAGE_LIST: &str = "book_chat_list"; // 存放离线消息

#[derive(Template, Deserialize, Serialize)]
#[template(path = "frontend/add_book.html")]
pub struct AddBookTemplate {
    pub err: String,
    pub authenticity_token: String,
}

#[derive(Template)]
#[template(path = "frontend/list_book.html")]
pub struct ListBookTemplate {
    pub prev_page: String,
    pub next_page: String,
    pub books: Vec<MpBook>,
}

#[derive(Template)]
#[template(path = "frontend/my.html")]
pub struct MyTemplate {
    pub prev_page: String,
    pub next_page: String,
    pub books: Vec<MpBook>,
}

#[derive(Template)]
#[template(path = "frontend/detail_book.html")]
pub struct DetailBookTemplate {
    pub id: i64,
    pub name: String,
    pub press: String,
    pub isbn: String,
    pub status: i32, // 1 非本人 2 本人已共享 3 本人未共享
    pub username: String,
    pub err: String,
}

#[derive(Template)]
#[template(path = "mp.md")]
pub struct MpMdTemplate {
    pub name: String,
    pub press: String,
    pub isbn: String,
    pub acode: String,
}

#[derive(Template)]
#[template(path = "mp.txt")]
pub struct MpTxtTemplate {
    pub name: String,
    pub press: String,
    pub isbn: String,
    pub acode: String,
}

#[derive(Template)]
#[template(path = "frontend/chat.html")]
pub struct ChatTemplate {
    pub yourname: String,
    pub myname: String,
}

#[derive(Template)]
#[template(path = "frontend/chat2.html")]
pub struct Chat2Template {}

pub async fn add_book_html(
    token: CsrfToken,
    session: Session<SessionRedisPool>,
    params: Query<EcodeParams>,
) -> impl IntoResponse {
    let authenticity_token = token.authenticity_token().unwrap();
    tracing::debug!("the csrf token {:?}", authenticity_token);
    session.set("authenticity_token", authenticity_token.clone());
    tracing::debug!("the params {:?}", params);

    let err_code = params.ecode.unwrap_or(10000);
    let err = match err_code {
        10000 => "",
        10003 => "名称不能为空",
        10004 => "出版社不能为空",
        10005 => "ISBN不能为空",
        10006 => "表单Token无效",
        10007 => "ISBN相同，请勿重复提交",
        _ => "系统错误",
    };
    let tmpl = AddBookTemplate {
        err: err.to_string(),
        authenticity_token,
    };
    // let handler_name = "frontend/book/addbook";
    // render(tmpl).map_err(log_error(handler_name))
    (token, tmpl).into_response()
}

#[derive(Debug, Deserialize)]
pub struct AddBook {
    pub authenticity_token: String,
    pub name: String,
    pub press: String,
    pub isbn: String,
}
pub async fn add_book_action(
    token: CsrfToken,
    session: Session<SessionRedisPool>,
    State(state): State<SharedState>,
    Form(book): Form<AddBook>,
) -> Result<(StatusCode, HeaderMap, ())> {
    tracing::debug!("form {:?}", book);
    let mut headers = HeaderMap::new();
    let mut url = String::new();
    let ecode;
    let authenticity_token: String = session.get("authenticity_token").unwrap_or_default();
    if let Err(err) = token.verify(&book.authenticity_token) {
        tracing::debug!("the csrf err {}", err);
        ecode = 10006;
        let lurl = format!("/addbook?ecode={}", ecode);
        url.push_str(&lurl);
        headers.insert(axum::http::header::LOCATION, url.parse().unwrap());
        return Ok((StatusCode::FOUND, headers, ()));
    } else if let Err(_) = token.verify(&authenticity_token) {
        ecode = 10006;
        let lurl = format!("/addbook?ecode={}", ecode);
        url.push_str(&lurl);
        headers.insert(axum::http::header::LOCATION, url.parse().unwrap());
        return Ok((StatusCode::FOUND, headers, ()));
    } else {
        // we remove it to only allow one post per generated token.
        session.remove("authenticity_token");
    }

    // 获取上传的form表单
    // 获取openid
    let rabbit_session = session.get(SESSION_KEY).unwrap_or(UserSession::default());
    tracing::debug!("the session is {:?}", rabbit_session);

    if book.isbn == "" || book.name == "" || book.press == "" {
        if book.name == "" {
            ecode = 10003;
        } else if book.press == "" {
            ecode = 10004;
        } else {
            ecode = 10005;
        }
        let lurl = format!("/addbook?ecode={}", ecode);
        url.push_str(&lurl);
        headers.insert(axum::http::header::LOCATION, url.parse().unwrap());
        return Ok((StatusCode::FOUND, headers, ()));
    }
    let openid = &rabbit_session.openid;
    if openid != "" {
        // 将数据插入数据库
        let pool = &state.db;
        let book_cnt = sql::query_book_count_v1(pool, openid.clone(), book.isbn.clone()).await?;
        if book_cnt > 0 {
            ecode = 10007;
            let lurl = format!("/addbook?ecode={}", ecode);
            url.push_str(&lurl);
            headers.insert(axum::http::header::LOCATION, url.parse().unwrap());
            return Ok((StatusCode::FOUND, headers, ()));
        }
        sql::add_book(
            pool,
            openid.clone(),
            book.name.clone(),
            book.press.clone(),
            book.isbn.clone(),
            0,
        )
        .await?;
        // 添加随机码，并发送审核通知
        let code = utils::get_rand_num(6);
        let acode = format!("acode-{}", code); // 审核码
        let client = &state.rs;
        redisx::set_ex(client, acode, book.isbn.clone(), 60 * 60 * 24).await?;
        // let mp_tmpl = MpMdTemplate {
        //     name: book.name,
        //     press: book.press,
        //     isbn: book.isbn,
        //     acode: code,
        // };
        // let md_str = mp_tmpl.render()?;
        let mp_tmpl = MpTxtTemplate {
            name: book.name,
            press: book.press,
            isbn: book.isbn,
            acode: code,
        };
        let md_str = mp_tmpl.render()?;
        tracing::debug!("the md str is {}", md_str);
        let conf = &state.conf;
        // utils::send_mp_md_notify(&conf.mp.robot_url, md_str).await?;
        utils::send_mp_notify(&conf.mp.robot_url, md_str).await?;
        url.push_str("/my?page=1&page_size=5");
    } else {
        url.push_str("/login");
    }

    headers.insert(axum::http::header::LOCATION, url.parse().unwrap());
    Ok((StatusCode::FOUND, headers, ()))
}

pub async fn share_book_action(
    session: Session<SessionRedisPool>,
    State(state): State<SharedState>,
    Path(id): Path<i64>,
) -> Result<(StatusCode, HeaderMap, ())> {
    let mut headers = HeaderMap::new();
    let mut url = String::new();

    let rabbit_session = session.get(SESSION_KEY).unwrap_or(UserSession::default());
    let openid = &rabbit_session.openid;
    if openid != "" {
        let pool = &state.db;
        let book = sql::get_book_detail(pool, id).await?;
        if book.status != 1 {
            // 图书还在审核中，请稍后再分享
            let pstr = format!("/detailbook/{}?ecode={}", id, 10008);
            url.push_str(&pstr);
        } else {
            sql::share_book_by_id(pool, openid.clone(), id).await?;
            url.push_str("/my?page=1&page_size=5");
        }
    } else {
        url.push_str("/login");
    }

    headers.insert(axum::http::header::LOCATION, url.parse().unwrap());
    Ok((StatusCode::FOUND, headers, ()))
}

pub async fn delete_book_action(
    session: Session<SessionRedisPool>,
    State(state): State<SharedState>,
    Path(id): Path<i64>,
) -> Result<(StatusCode, HeaderMap, ())> {
    let mut headers = HeaderMap::new();
    let mut url = String::new();

    let rabbit_session = session.get(SESSION_KEY).unwrap_or(UserSession::default());
    let openid = &rabbit_session.openid;
    if openid != "" {
        let pool = &state.db;
        sql::delete_book_by_id(pool, openid.clone(), id).await?;
        url.push_str("/my?page=1&page_size=5");
    } else {
        url.push_str("/login");
    }

    headers.insert(axum::http::header::LOCATION, url.parse().unwrap());
    Ok((StatusCode::FOUND, headers, ()))
}

#[derive(Debug, Deserialize)]
pub struct Paging {
    pub page: i32,
    pub page_size: i32,
}
pub async fn list_book_html(
    State(state): State<SharedState>,
    Query(paging): Query<Paging>,
) -> Result<HtmlView> {
    let pool = &state.db;
    let books = sql::get_pub_book_list(pool, paging.page, paging.page_size).await?;
    tracing::debug!("the books {:?}", books);
    let prev_page = format!("page={}&page_size={}", paging.page - 1, paging.page_size);
    let next_page = format!("page={}&page_size={}", paging.page + 1, paging.page_size);
    let tmpl = ListBookTemplate {
        books,
        prev_page,
        next_page,
    };

    let handler_name = "frontend/book/listbook";
    render(tmpl).map_err(log_error(handler_name))
}

pub async fn my_html(
    session: Session<SessionRedisPool>,
    State(state): State<SharedState>,
    Query(paging): Query<Paging>,
) -> Result<HtmlView> {
    let rabbit_session = session.get(SESSION_KEY).unwrap_or(UserSession::default());
    tracing::debug!("the session {:?}", rabbit_session);
    let openid = rabbit_session.openid;
    let pool = &state.db;
    let books = sql::get_my_book_list(pool, openid, paging.page, paging.page_size).await?;
    tracing::debug!("the books {:?}", books);

    let prev_page = format!("page={}&page_size={}", paging.page - 1, paging.page_size);
    let next_page = format!("page={}&page_size={}", paging.page + 1, paging.page_size);
    let tmpl = MyTemplate {
        books,
        prev_page,
        next_page,
    };
    let handler_name = "frontend/book/my";
    render(tmpl).map_err(log_error(handler_name))
}

pub async fn detail_book_html(
    session: Session<SessionRedisPool>,
    State(state): State<SharedState>,
    Path(id): Path<i64>,
    params: Query<EcodeParams>,
) -> Result<HtmlView> {
    let rabbit_session = session.get(SESSION_KEY).unwrap_or(UserSession::default());
    tracing::debug!("the session {:?}", rabbit_session);
    let openid = rabbit_session.openid;
    let pool = &state.db;

    let book = sql::get_book_detail(pool, id).await?;
    tracing::debug!("the book {:?}", book);
    let user = sql::query_user(pool, book.openid.clone()).await?;
    let status = if openid != book.openid {
        1
    } else if book.isshare == 1 {
        2
    } else {
        3
    };
    tracing::debug!("status {}", status);
    let err_code = params.ecode.unwrap_or(10000);
    let err = match err_code {
        10000 => "",
        10008 => "图书还在审核中，请稍后再分享",
        _ => "系统错误",
    };
    let tmpl = DetailBookTemplate {
        id: book.id,
        name: book.name,
        press: book.press,
        isbn: book.isbn,
        status,
        username: user.nickname,
        err: err.to_string(),
    };
    let handler_name = "frontend/book/detail";
    render(tmpl).map_err(log_error(handler_name))
}

pub async fn chat_book_html(
    session: Session<SessionRedisPool>,
    Path(name): Path<String>,
) -> Result<HtmlView> {
    let rabbit_session = session.get(SESSION_KEY).unwrap_or(UserSession::default());
    let tmpl = ChatTemplate {
        myname: rabbit_session.name.clone(),
        yourname: name,
    };
    let handler_name = "frontend/book/chat";
    render(tmpl).map_err(log_error(handler_name))
}

pub async fn chat2_book_html() -> Result<HtmlView> {
    let tmpl = Chat2Template {};
    let handler_name = "frontend/book/chat2";
    render(tmpl).map_err(log_error(handler_name))
}

pub async fn chat_book_websocket(
    ws: WebSocketUpgrade,
    State(state): State<SharedState>,
    session: Session<SessionRedisPool>,
) -> impl IntoResponse {
    let rabbit_session = session.get(SESSION_KEY).unwrap_or(UserSession::default());
    tracing::debug!("the session {:?}", rabbit_session);
    let openid = rabbit_session.openid;
    let name = rabbit_session.name;
    tracing::debug!("{} open websocket 0", name);
    ws.on_upgrade(|socket| websocket(socket, state, openid, name))
}

async fn websocket(stream: WebSocket, state: SharedState, openid: String, name: String) {
    // By splitting, we can send and receive at the same time.
    tracing::debug!("{} open websocket", name);
    let (mut sender, mut receiver) = stream.split();
    let now = utils::get_timestamp();
    let rs = &state.rs;
    let hkey = BOOK_MESSAGE_HASH.to_string();
    let ts_str = redisx::hget(rs, &hkey, name.clone())
        .await
        .unwrap_or("".to_string());
    tracing::debug!("ts_str is {:?}", ts_str);
    if ts_str != "" {
        tracing::debug!("离线消息1");
        let ts: i64 = ts_str.parse().unwrap();
        let lkey = format!("{}_{}", BOOK_MESSAGE_LIST, name.clone());
        if now - ts < 3 * 24 * 60 * 60 {
            tracing::debug!("离线消息4");
            loop {
                let msg_str = redisx::rpop(rs, &lkey).await.unwrap_or("".to_string());
                if msg_str == "" {
                    break;
                }
                if sender.send(Message::Text(msg_str)).await.is_err() {
                    break;
                }
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
                tracing::debug!("离线消息5");
            }
        }
        tracing::debug!("离线消息2");
        let _ = redisx::hdel(rs, &hkey, name.clone()).await;
        let _ = redisx::del(rs, &lkey).await;
        tracing::debug!("离线消息3");
    }
    tracing::debug!("{} open websocket2", name);
    let (tx, mut rx) = mpsc::unbounded();

    let ws_userinfo = WsUserInfo {
        openid: openid.clone(),
        name: name.clone(),
        tx: tx.clone(),
    };
    let rstate = state.clone();
    state
        .ws
        .peer_map
        .lock()
        .unwrap()
        .insert(name.clone(), ws_userinfo);
    tracing::debug!("{} open websocket3", name);
    // 接收到消息
    let ws_task = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(50));
        loop {
            tokio::select! {
                    msg = rx.next() => {
                        match msg {
                            Some(msg) => {
                                tracing::debug!("我通道收到了消息{:?}", msg);
                                // In any websocket error, break loop.
                                let msg_str = serde_json::to_string(&msg).unwrap();
                                if sender.send(Message::Text(msg_str)).await.is_err() {
                                    break;
                                }
                                tracing::debug!("channel websocket recv");
                            }
                            None => break,
                        }
                    }
                    _ = interval.tick() => {
                        if sender.send(Message::Ping(vec![211])).await.is_err() {
                            tracing::debug!("websocket ping error");
                            break;
                        }
                    }
                    text=receiver.next()=>{
                        match text {
                            Some(Ok(Message::Text(text))) => {
                                // 客户端消息需要按照规则来
                                tracing::debug!("我收到了消息{:?}", text);
                                let ws_msg: WsMsg = serde_json::from_str(&text).unwrap();
                                let dest_name = ws_msg.dest.clone();
                                let user_info = get_userinfo(&rstate, &dest_name);
                                match user_info {
                                    Some(user) => {
                                        tracing::debug!("user info {:?}", user);
                                        let tx = &user.tx;
                                        tx.unbounded_send(ws_msg.clone()).unwrap();
                                        tracing::debug!("channel websocket send2");
                                    }
                                    None => {
                                        let dest_name = dest_name.clone();
                                        if dest_name.len() == 8 {
                                            tracing::debug!("channel websocket send3 {}", dest_name);
                                            let now = utils::get_timestamp();
                                            let rs = &rstate.rs;
                                            let _ = redisx::hset(
                                                rs,
                                                &BOOK_MESSAGE_HASH.to_string(),
                                                dest_name.clone(),
                                                now.to_string(),
                                            )
                                            .await;
                                            let hkey = format!("{}_{}", BOOK_MESSAGE_LIST, dest_name.clone());
                                            let _ = redisx::lpush(rs, &hkey, text.clone()).await;
                                        }
                                        tracing::debug!("channel websocket send4");
                                    }
                                }
                                tracing::debug!("channel websocket send");
                            }
                            Some(Ok(Message::Close(_)))=>{
                                tracing::debug!("receive close message");
                                break;
                            },
                            Some(Err(e))=> {
                                tracing::debug!("the err is {:?}",e);
                            },
                            Some(_)=>{},
                            None => break,
                        }
                    }
            }
        }
    });
    // 发送消息
    // let mut recv_task = tokio::spawn(async move {
    //     while let Some(Ok(Message::Text(text))) = receiver.next().await {
    //         // 客户端消息需要按照规则来
    //         tracing::debug!("我收到了消息{:?}", text);
    //         let ws_msg: WsMsg = serde_json::from_str(&text).unwrap();
    //         let dest_name = ws_msg.dest.clone();
    //         let user_info = get_userinfo(&rstate, &dest_name);
    //         match user_info {
    //             Some(user) => {
    //                 tracing::debug!("user info {:?}", user);
    //                 let tx = &user.tx;
    //                 tx.unbounded_send(ws_msg.clone()).unwrap();
    //                 tracing::debug!("channel websocket send2");
    //             }
    //             None => {
    //                 let dest_name = dest_name.clone();
    //                 if dest_name.len() == 8 {
    //                     tracing::debug!("channel websocket send3 {}", dest_name);
    //                     let now = utils::get_timestamp();
    //                     let rs = &rstate.rs;
    //                     let _ = redisx::hset(
    //                         rs,
    //                         &BOOK_MESSAGE_HASH.to_string(),
    //                         dest_name.clone(),
    //                         now.to_string(),
    //                     )
    //                     .await;
    //                     let hkey = format!("{}_{}", BOOK_MESSAGE_LIST, dest_name.clone());
    //                     let _ = redisx::lpush(rs, &hkey, text.clone()).await;
    //                 }
    //                 tracing::debug!("channel websocket send4");
    //             }
    //         }
    //         tracing::debug!("channel websocket send");
    //     }
    // });

    tracing::debug!("{} open websocket4", name);
    // If any one of the tasks run to completion, we abort the other.
    // tokio::select! {
    //     _ = (&mut send_task) => recv_task.abort(),
    //     _ = (&mut recv_task) => send_task.abort(),
    // };

    // 等待结束
    let _ = ws_task.await;

    tracing::debug!("{} open websocket5", name);
    // 清理掉信息
    state.ws.peer_map.lock().unwrap().remove(&name);
}

// 获取用户信息
fn get_userinfo(state: &SharedState, name: &str) -> Option<WsUserInfo> {
    let peers = state.ws.peer_map.lock().unwrap();
    let user_info = peers.get(name);
    // 重点
    let info = match user_info {
        Some(user) => Some(user.into()),
        None => None,
    };
    info
}
